<!DOCTYPE html>



  


<html class="theme-next muse use-motion" lang="zh-Hans">
<head>
  <meta charset="UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1"/>
<meta name="theme-color" content="#222">









<meta http-equiv="Cache-Control" content="no-transform" />
<meta http-equiv="Cache-Control" content="no-siteapp" />
















  
  
  <link href="/lib/fancybox/source/jquery.fancybox.css?v=2.1.5" rel="stylesheet" type="text/css" />







<link href="/lib/font-awesome/css/font-awesome.min.css?v=4.6.2" rel="stylesheet" type="text/css" />

<link href="/css/main.css?v=5.1.4" rel="stylesheet" type="text/css" />


  <link rel="apple-touch-icon" sizes="180x180" href="/images/apple-touch-icon-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="32x32" href="/images/favicon-32x32-next.png?v=5.1.4">


  <link rel="icon" type="image/png" sizes="16x16" href="/images/favicon-16x16-next.png?v=5.1.4">


  <link rel="mask-icon" href="/images/logo.svg?v=5.1.4" color="#222">





  <meta name="keywords" content="Hexo, NexT" />










<meta name="description" content="本文将按照《RocketMQ 多副本前置篇：初探raft协议》的思路来学习RocketMQ选主逻辑。首先先回顾一下关于Leader的一些思考：  节点状态 需要引入3种节点状态：Follower(跟随者)、Candidate(候选者)，该状态下的节点会发起投票请求，Leader(主节点)。 选举计时器 Follower、Candidate两个状态时，需要维护一个定时器，每次定时时间从150ms-3">
<meta property="og:type" content="article">
<meta property="og:title" content="源码分析 RocketMQ DLedger 多副本之 Leader 选主">
<meta property="og:url" content="https://www.codingw.net/posts/8b1a2fc4.html">
<meta property="og:site_name" content="中间件兴趣圈">
<meta property="og:description" content="本文将按照《RocketMQ 多副本前置篇：初探raft协议》的思路来学习RocketMQ选主逻辑。首先先回顾一下关于Leader的一些思考：  节点状态 需要引入3种节点状态：Follower(跟随者)、Candidate(候选者)，该状态下的节点会发起投票请求，Leader(主节点)。 选举计时器 Follower、Candidate两个状态时，需要维护一个定时器，每次定时时间从150ms-3">
<meta property="og:locale">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190817201207350.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190817201557322.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="og:image" content="https://img-blog.csdnimg.cn/20190817204737273.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">
<meta property="article:published_time" content="2020-08-23T15:43:01.000Z">
<meta property="article:modified_time" content="2021-04-26T12:08:22.664Z">
<meta property="article:author" content="中间件兴趣圈">
<meta property="article:tag" content="中间件">
<meta name="twitter:card" content="summary">
<meta name="twitter:image" content="https://img-blog.csdnimg.cn/20190817201207350.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70">



<script type="text/javascript" id="hexo.configurations">
  var NexT = window.NexT || {};
  var CONFIG = {
    root: '',
    scheme: 'Muse',
    version: '5.1.4',
    sidebar: {"position":"left","display":"post","offset":12,"b2t":false,"scrollpercent":false,"onmobile":false},
    fancybox: true,
    tabs: true,
    motion: {"enable":true,"async":false,"transition":{"post_block":"fadeIn","post_header":"slideDownIn","post_body":"slideDownIn","coll_header":"slideLeftIn","sidebar":"slideUpIn"}},
    duoshuo: {
      userId: '0',
      author: '博主'
    },
    algolia: {
      applicationID: '',
      apiKey: '',
      indexName: '',
      hits: {"per_page":10},
      labels: {"input_placeholder":"Search for Posts","hits_empty":"We didn't find any results for the search: ${query}","hits_stats":"${hits} results found in ${time} ms"}
    }
  };
</script>



  <link rel="canonical" href="https://www.codingw.net/posts/8b1a2fc4.html"/>





  <title>源码分析 RocketMQ DLedger 多副本之 Leader 选主 | 中间件兴趣圈</title>
  








<meta name="generator" content="Hexo 5.4.0"></head>

<body itemscope itemtype="http://schema.org/WebPage" lang="zh-Hans">

  
  
    
  

  <div class="container sidebar-position-left page-post-detail">
    <div class="headband"></div>

    <header id="header" class="header" itemscope itemtype="http://schema.org/WPHeader">
      <div class="header-inner"><div class="site-brand-wrapper">
  <div class="site-meta ">
    

    <div class="custom-logo-site-title">
      <a href="/"  class="brand" rel="start">
        <span class="logo-line-before"><i></i></span>
        <span class="site-title">中间件兴趣圈</span>
        <span class="logo-line-after"><i></i></span>
      </a>
    </div>
      
        <p class="site-subtitle">微信搜『中间件兴趣圈』，回复『Java』获取200本优质电子书</p>
      
  </div>

  <div class="site-nav-toggle">
    <button>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
      <span class="btn-bar"></span>
    </button>
  </div>
</div>

<nav class="site-nav">
  

  
    <ul id="menu" class="menu">
      
        
        <li class="menu-item menu-item-home">
          <a href="/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-home"></i> <br />
            
            首页
          </a>
        </li>
      
        
        <li class="menu-item menu-item-categories">
          <a href="/categories/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            分类
          </a>
        </li>
      
        
        <li class="menu-item menu-item-archives">
          <a href="/archives/" rel="section">
            
              <i class="menu-item-icon fa fa-fw fa-question-circle"></i> <br />
            
            归档
          </a>
        </li>
      

      
    </ul>
  

  
</nav>



 </div>
    </header>

    <main id="main" class="main">
      <div class="main-inner">
        <div class="content-wrap">
          <div id="content" class="content">
            

  <div id="posts" class="posts-expand">
    

  

  
  
  

  <article class="post post-type-normal" itemscope itemtype="http://schema.org/Article">
  
  
  
  <div class="post-block">
    <link itemprop="mainEntityOfPage" href="https://www.codingw.net/posts/8b1a2fc4.html">

    <span hidden itemprop="author" itemscope itemtype="http://schema.org/Person">
      <meta itemprop="name" content="">
      <meta itemprop="description" content="">
      <meta itemprop="image" content="/images/avatar.gif">
    </span>

    <span hidden itemprop="publisher" itemscope itemtype="http://schema.org/Organization">
      <meta itemprop="name" content="中间件兴趣圈">
    </span>

    
      <header class="post-header">

        
        
          <h1 class="post-title" itemprop="name headline">源码分析 RocketMQ DLedger 多副本之 Leader 选主</h1>
        

        <div class="post-meta">
          <span class="post-time">
            
              <span class="post-meta-item-icon">
                <i class="fa fa-calendar-o"></i>
              </span>
              
                <span class="post-meta-item-text">发表于</span>
              
              <time title="创建于" itemprop="dateCreated datePublished" datetime="2020-08-23T23:43:01+08:00">
                2020-08-23
              </time>
            

            

            
          </span>

          
            <span class="post-category" >
            
              <span class="post-meta-divider">|</span>
            
              <span class="post-meta-item-icon">
                <i class="fa fa-folder-o"></i>
              </span>
              
                <span class="post-meta-item-text">分类于</span>
              
              
                <span itemprop="about" itemscope itemtype="http://schema.org/Thing">
                  <a href="/categories/rocketmq/" itemprop="url" rel="index">
                    <span itemprop="name">rocketmq</span>
                  </a>
                </span>

                
                
              
            </span>
          

          
            
          

          
          
             <span id="/posts/8b1a2fc4.html" class="leancloud_visitors" data-flag-title="源码分析 RocketMQ DLedger 多副本之 Leader 选主">
               <span class="post-meta-divider">|</span>
               <span class="post-meta-item-icon">
                 <i class="fa fa-eye"></i>
               </span>
               
                 <span class="post-meta-item-text">阅读次数&#58;</span>
               
                 <span class="leancloud-visitors-count"></span>
             </span>
          

          
            <span class="post-meta-divider">|</span>
            <span class="page-pv"><i class="fa fa-file-o"></i>
            <span class="busuanzi-value" id="busuanzi_value_page_pv" ></span>次
            </span>
          

          

          

        </div>
      </header>
    

    
    
    
    <div class="post-body" itemprop="articleBody">

      
      

      
        <div id="vip-container"><p>本文将按照<a target="_blank" rel="noopener" href="https://blog.csdn.net/prestigeding/article/details/99101912">《RocketMQ 多副本前置篇：初探raft协议》</a>的思路来学习RocketMQ选主逻辑。首先先回顾一下关于Leader的一些思考：</p>
<ol>
<li>节点状态<br> 需要引入3种节点状态：Follower(跟随者)、Candidate(候选者)，该状态下的节点会发起投票请求，Leader(主节点)。</li>
<li>选举计时器<br> Follower、Candidate两个状态时，需要维护一个定时器，每次定时时间从150ms-300ms直接进行随机，即每个节点的定时过期不一样，Follower状态时，定时器到点后，触发一轮投票。节点在收到投票请求、Leader的心跳请求并作出响应后，需要重置定时器。</li>
<li>投票轮次Team<br> Candidate状态的节点，每发起一轮投票，Team加一。</li>
<li>投票机制<br> 每一轮一个节点只能为一个节点投赞成票，例如节点A中维护的轮次为3，并且已经为节点B投了赞成票，如果收到其他节点，投票轮次为3，则会投反对票，如果收到轮次为4的节点，是又可以投赞成票的。</li>
<li>成为Leader的条件<br> 必须得到集群中初始数量的大多数，例如如果集群中有3台，则必须得到两票，如果其中一台服务器宕机，剩下的两个节点，还能进行选主吗？答案是可以的，因为可以得到2票，超过初始集群中3的一半，所以通常集群中的机器各位尽量为奇数，因为4台的可用性与3台的一样。</li>
</ol>
<blockquote>
<p>温馨提示：本文是从源码的角度分析 DLedger 选主实现原理，可能比较鼓噪，文末给出了选主流程图。</p>
</blockquote>
<p>@<a href="%E6%9C%AC%E8%8A%82%E7%9B%AE%E5%BD%95">TOC</a></p>
<h2 id="1、DLedger关于选主的核心类图"><a href="#1、DLedger关于选主的核心类图" class="headerlink" title="1、DLedger关于选主的核心类图"></a>1、DLedger关于选主的核心类图</h2><p><img src="https://img-blog.csdnimg.cn/20190817201207350.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"></p>
<h3 id="1-1-DLedgerConfig"><a href="#1-1-DLedgerConfig" class="headerlink" title="1.1 DLedgerConfig"></a>1.1 DLedgerConfig</h3><p>多副本模块相关的配置信息，例如集群节点信息。</p>
<h3 id="1-2-MemberState"><a href="#1-2-MemberState" class="headerlink" title="1.2 MemberState"></a>1.2 MemberState</h3><p>节点状态机，即raft协议中的follower、candidate、leader三种状态的状态机实现。</p>
<h3 id="1-3-raft协议相关"><a href="#1-3-raft协议相关" class="headerlink" title="1.3 raft协议相关"></a>1.3 raft协议相关</h3><h4 id="1-3-1-DLedgerClientProtocol"><a href="#1-3-1-DLedgerClientProtocol" class="headerlink" title="1.3.1 DLedgerClientProtocol"></a>1.3.1 DLedgerClientProtocol</h4><p>DLedger客户端协议，主要定义如下三个方法，在后面的日志复制部分会重点阐述。</p>
<ul>
<li>CompletableFuture&lt; GetEntriesResponse&gt; get(GetEntriesRequest request)<br>客户端从服务器获取日志条目（获取数据）</li>
<li>CompletableFuture&lt; AppendEntryResponse&gt; append(AppendEntryRequest request)<br>客户端向服务器追加日志（存储数据）</li>
<li>CompletableFuture&lt; MetadataResponse&gt; metadata(MetadataRequest request)<br>获取元数据。</li>
</ul>
<h4 id="1-3-2-DLedgerProtocol"><a href="#1-3-2-DLedgerProtocol" class="headerlink" title="1.3.2 DLedgerProtocol"></a>1.3.2 DLedgerProtocol</h4><p>DLedger服务端协议，主要定义如下三个方法。</p>
<ul>
<li>CompletableFuture&lt; VoteResponse&gt; vote(VoteRequest request)<br>发起投票请求。</li>
<li>CompletableFuture&lt; HeartBeatResponse&gt; heartBeat(HeartBeatRequest request)<br>Leader向从节点发送心跳包。</li>
<li>CompletableFuture&lt; PullEntriesResponse&gt; pull(PullEntriesRequest request)<br>拉取日志条目，在日志复制部分会详细介绍。</li>
<li>CompletableFuture&lt; PushEntryResponse&gt; push(PushEntryRequest request)<br>推送日志条件，在日志复制部分会详细介绍。</li>
</ul>
<h4 id="1-3-3-协议处理Handler"><a href="#1-3-3-协议处理Handler" class="headerlink" title="1.3.3 协议处理Handler"></a>1.3.3 协议处理Handler</h4><p>DLedgerClientProtocolHandler、DLedgerProtocolHander协议处理器。</p>
<h3 id="1-4-DLedgerRpcService"><a href="#1-4-DLedgerRpcService" class="headerlink" title="1.4 DLedgerRpcService"></a>1.4 DLedgerRpcService</h3><p>DLedger Server(节点)之间的网络通信，默认基于Netty实现，其实现类为：DLedgerRpcNettyService。</p>
<h3 id="1-5-DLedgerLeaderElector"><a href="#1-5-DLedgerLeaderElector" class="headerlink" title="1.5 DLedgerLeaderElector"></a>1.5 DLedgerLeaderElector</h3><p>Leader选举实现器。</p>
<h3 id="1-6-DLedgerServer"><a href="#1-6-DLedgerServer" class="headerlink" title="1.6 DLedgerServer"></a>1.6 DLedgerServer</h3><p>Dledger Server，Dledger节点的封装类。</p>
<p>接下来将从DLedgerLeaderElector开始剖析DLedger是如何实现Leader选举的。（基于raft协议）。</p>
<span id="more"></span>

<h2 id="2、源码分析Leader选举"><a href="#2、源码分析Leader选举" class="headerlink" title="2、源码分析Leader选举"></a>2、源码分析Leader选举</h2><h3 id="2-1-DLedgerLeaderElector-类图"><a href="#2-1-DLedgerLeaderElector-类图" class="headerlink" title="2.1 DLedgerLeaderElector 类图"></a>2.1 DLedgerLeaderElector 类图</h3><p><img src="https://img-blog.csdnimg.cn/20190817201557322.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"><br>我们先一一来介绍其属性的含义：</p>
<ul>
<li>Random random<br>随机数生成器，对应raft协议中选举超时时间是一随机数。</li>
<li>DLedgerConfig dLedgerConfig<br>配置参数。</li>
<li>MemberState memberState<br>节点状态机。</li>
<li>DLedgerRpcService dLedgerRpcService<br>rpc服务，实现向集群内的节点发送心跳包、投票的RPC实现。<br>l- ong lastLeaderHeartBeatTime<br>上次收到心跳包的时间戳。</li>
<li>long lastSendHeartBeatTime<br>上次发送心跳包的时间戳。</li>
<li>long lastSuccHeartBeatTime<br>上次成功收到心跳包的时间戳。</li>
<li>int heartBeatTimeIntervalMs<br>一个心跳包的周期，默认为2s。</li>
<li>int maxHeartBeatLeak<br>允许最大的N个心跳周期内未收到心跳包，状态为Follower的节点只有超过 maxHeartBeatLeak * heartBeatTimeIntervalMs 的时间内未收到主节点的心跳包，才会重新进入 Candidate 状态，重新下一轮的选举。</li>
<li>long nextTimeToRequestVote<br>发送下一个心跳包的时间戳。</li>
<li>boolean needIncreaseTermImmediately<br>是否应该立即发起投票。</li>
<li>int minVoteIntervalMs<br>最小的发送投票间隔时间，默认为300ms。</li>
<li>int maxVoteIntervalMs<br>最大的发送投票的间隔，默认为1000ms。</li>
<li>List&lt; RoleChangeHandler&gt; roleChangeHandlers<br>注册的节点状态处理器，通过 addRoleChangeHandler 方法添加。</li>
<li>long lastVoteCost<br>上一次投票的开销。</li>
<li>StateMaintainer stateMaintainer<br>状态机管理器。</li>
</ul>
<h3 id="2-2-启动选举状态管理器"><a href="#2-2-启动选举状态管理器" class="headerlink" title="2.2 启动选举状态管理器"></a>2.2 启动选举状态管理器</h3><p>通过 DLedgerLeaderElector 的 startup 方法启动状态管理机，代码如下：<br>DLedgerLeaderElector#startup</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">startup</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    stateMaintainer.start();   <span class="comment">// @1</span></span><br><span class="line">    <span class="keyword">for</span> (RoleChangeHandler roleChangeHandler : roleChangeHandlers) &#123;   <span class="comment">// @2</span></span><br><span class="line">        roleChangeHandler.startup();</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：启动状态维护管理器。</p>
<p>代码@2：遍历状态改变监听器并启动它，可通过DLedgerLeaderElector 的 addRoleChangeHandler 方法增加状态变化监听器。</p>
<p>其中的是启动状态管理器线程，其run方法实现：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">run</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    <span class="keyword">while</span> (running.get()) &#123;</span><br><span class="line">        <span class="keyword">try</span> &#123;</span><br><span class="line">            doWork();    </span><br><span class="line">        &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">            <span class="keyword">if</span> (logger != <span class="keyword">null</span>) &#123;</span><br><span class="line">                logger.error(<span class="string">&quot;Unexpected Error in running &#123;&#125; &quot;</span>, getName(), t);</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">    latch.countDown();</span><br><span class="line">&#125; </span><br></pre></td></tr></table></figure>
<p>从上面来看，主要是循环调用doWork方法，接下来重点看其doWork的实现：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">doWork</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    <span class="keyword">try</span> &#123;</span><br><span class="line">        <span class="keyword">if</span> (DLedgerLeaderElector.<span class="keyword">this</span>.dLedgerConfig.isEnableLeaderElector()) &#123;   <span class="comment">// @1</span></span><br><span class="line">            DLedgerLeaderElector.<span class="keyword">this</span>.refreshIntervals(dLedgerConfig);                 <span class="comment">// @2</span></span><br><span class="line">            DLedgerLeaderElector.<span class="keyword">this</span>.maintainState();                                           <span class="comment">// @3</span></span><br><span class="line">        &#125;</span><br><span class="line">        sleep(<span class="number">10</span>);                                                                                                    <span class="comment">// @4</span></span><br><span class="line">    &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">        DLedgerLeaderElector.logger.error(<span class="string">&quot;Error in heartbeat&quot;</span>, t);</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：如果该节点参与Leader选举，则首先调用@2重置定时器，然后驱动状态机(@3)，是接下来重点需要剖析的。</p>
<p>代码@4：没执行一次选主，休息10ms。</p>
<p>DLedgerLeaderElector#maintainState</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">maintainState</span><span class="params">()</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">    <span class="keyword">if</span> (memberState.isLeader()) &#123;  </span><br><span class="line">        maintainAsLeader();</span><br><span class="line">    &#125; <span class="keyword">else</span> <span class="keyword">if</span> (memberState.isFollower()) &#123;</span><br><span class="line">        maintainAsFollower();</span><br><span class="line">    &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">        maintainAsCandidate();</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>根据当前的状态机状态，执行对应的操作，从raft协议中可知，总共存在3种状态：</p>
<ul>
<li>leader<br>领导者，主节点，该状态下，需要定时向从节点发送心跳包，用来传播数据、确保其领导地位。</li>
<li>follower<br>从节点，该状态下，会开启定时器，尝试进入到candidate状态，以便发起投票选举，同时一旦收到主节点的心跳包，则重置定时器。</li>
<li>candidate<br>候选者，该状态下的节点会发起投票，尝试选择自己为主节点，选举成功后，不会存在该状态下的节点。</li>
</ul>
<p>我们在继续往下看之前，需要知道 memberState 的初始值是什么？我们追溯到创建 MemberState 的地方，发现其初始状态为 CANDIDATE。那我们接下从 maintainAsCandidate 方法开始跟进。</p>
<blockquote>
<p>温馨提示：在raft协议中，节点的状态默认为follower，DLedger的实现从candidate开始，一开始，集群内的所有节点都会尝试发起投票，这样第一轮要达成选举几乎不太可能。</p>
</blockquote>
<h3 id="2-3-选举状态机状态流转"><a href="#2-3-选举状态机状态流转" class="headerlink" title="2.3 选举状态机状态流转"></a>2.3 选举状态机状态流转</h3><p>整个状态机的驱动，由线程反复执行maintainState方法。下面重点来分析其状态的驱动。</p>
<h4 id="2-3-1-maintainAsCandidate-方法"><a href="#2-3-1-maintainAsCandidate-方法" class="headerlink" title="2.3.1  maintainAsCandidate 方法"></a>2.3.1  maintainAsCandidate 方法</h4><p>DLedgerLeaderElector#maintainAsCandidate</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (System.currentTimeMillis() &lt; nextTimeToRequestVote &amp;&amp; !needIncreaseTermImmediately) &#123;</span><br><span class="line">    <span class="keyword">return</span>;</span><br><span class="line">&#125;</span><br><span class="line"><span class="keyword">long</span> term;</span><br><span class="line"><span class="keyword">long</span> ledgerEndTerm;</span><br><span class="line"><span class="keyword">long</span> ledgerEndIndex;</span><br></pre></td></tr></table></figure>
<p>Step1：首先先介绍几个变量的含义：</p>
<ul>
<li>nextTimeToRequestVote<br>下一次发发起的投票的时间，如果当前时间小于该值，说明计时器未过期，此时无需发起投票。</li>
<li>needIncreaseTermImmediately<br>是否应该立即发起投票。如果为true，则忽略计时器，该值默认为false，当收到从主节点的心跳包并且当前状态机的轮次大于主节点的轮次，说明集群中Leader的投票轮次小于从几点的轮次，应该立即发起新的投票。</li>
<li>term<br>投票轮次。</li>
<li>ledgerEndTerm<br>Leader节点当前的投票轮次。</li>
<li>ledgerEndIndex<br>当前日志的最大序列，即下一条日志的开始index，在日志复制部分会详细介绍。</li>
</ul>
<p>DLedgerLeaderElector#maintainAsCandidate</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">synchronized</span> (memberState) &#123;</span><br><span class="line">    <span class="keyword">if</span> (!memberState.isCandidate()) &#123;</span><br><span class="line">        <span class="keyword">return</span>;</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">if</span> (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) &#123;</span><br><span class="line">        <span class="keyword">long</span> prevTerm = memberState.currTerm();</span><br><span class="line">        term = memberState.nextTerm();</span><br><span class="line">        logger.info(<span class="string">&quot;&#123;&#125;_[INCREASE_TERM] from &#123;&#125; to &#123;&#125;&quot;</span>, memberState.getSelfId(), prevTerm, term);</span><br><span class="line">        lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;</span><br><span class="line">    &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">        term = memberState.currTerm();</span><br><span class="line">    &#125;</span><br><span class="line">    ledgerEndIndex = memberState.getLedgerEndIndex();</span><br><span class="line">    ledgerEndTerm = memberState.getLedgerEndTerm();</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step2：初始化team、ledgerEndIndex 、ledgerEndTerm 属性，其实现关键点如下：</p>
<ul>
<li>如果上一次的投票结果为待下一次投票或应该立即开启投票，并且根据当前状态机获取下一轮的投票轮次，稍后会着重讲解一下状态机轮次的维护机制。</li>
<li>如果上一次的投票结果不是WAIT_TO_VOTE_NEXT(等待下一轮投票)，则投票轮次依然为状态机内部维护的轮次。</li>
</ul>
<p>DLedgerLeaderElector#maintainAsCandidate</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (needIncreaseTermImmediately) &#123;</span><br><span class="line">    nextTimeToRequestVote = getNextTimeToRequestVote();</span><br><span class="line">    needIncreaseTermImmediately = <span class="keyword">false</span>;</span><br><span class="line">    <span class="keyword">return</span>;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step3：如果needIncreaseTermImmediately为true，则重置该标记位为false，并重新设置下一次投票超时时间，其实现代码如下：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">long</span> <span class="title">getNextTimeToRequestVote</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    <span class="keyword">return</span> System.currentTimeMillis() + lastVoteCost + minVoteIntervalMs + random.nextInt(maxVoteIntervalMs - minVoteIntervalMs);</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>下一次倒计时：当前时间戳 + 上次投票的开销 + 最小投票间隔(300ms) +  （1000- 300 ）之间的随机值。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">final</span> List&lt;CompletableFuture&lt;VoteResponse&gt;&gt; quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);</span><br></pre></td></tr></table></figure>
<p>Step4：向集群内的其他节点发起投票请，并返回投票结果列表，稍后会重点分析其投票过程。可以预见，接下来就是根据各投票结果进行仲裁。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">final</span> AtomicLong knownMaxTermInGroup = <span class="keyword">new</span> AtomicLong(-<span class="number">1</span>);</span><br><span class="line"><span class="keyword">final</span> AtomicInteger allNum = <span class="keyword">new</span> AtomicInteger(<span class="number">0</span>);</span><br><span class="line"><span class="keyword">final</span> AtomicInteger validNum = <span class="keyword">new</span> AtomicInteger(<span class="number">0</span>);</span><br><span class="line"><span class="keyword">final</span> AtomicInteger acceptedNum = <span class="keyword">new</span> AtomicInteger(<span class="number">0</span>);</span><br><span class="line"><span class="keyword">final</span> AtomicInteger notReadyTermNum = <span class="keyword">new</span> AtomicInteger(<span class="number">0</span>);</span><br><span class="line"><span class="keyword">final</span> AtomicInteger biggerLedgerNum = <span class="keyword">new</span> AtomicInteger(<span class="number">0</span>);</span><br><span class="line"><span class="keyword">final</span> AtomicBoolean alreadyHasLeader = <span class="keyword">new</span> AtomicBoolean(<span class="keyword">false</span>);</span><br></pre></td></tr></table></figure>
<p>Step5：在进行投票结果仲裁之前，先来介绍几个局部变量的含义：</p>
<ul>
<li>knownMaxTermInGroup<br>已知的最大投票轮次。</li>
<li>allNum<br>所有投票票数。</li>
<li>validNum<br>有效投票数。 </li>
<li>acceptedNum<br>获得的投票数。 </li>
<li>notReadyTermNum<br>未准备投票的节点数量，如果对端节点的投票轮次小于发起投票的轮次，则认为对端未准备好，对端节点使用本次的轮次进入 - Candidate 状态。</li>
<li>biggerLedgerNum<br> 发起投票的节点的ledgerEndTerm小于对端节点的个数。</li>
<li>alreadyHasLeader<br>是否已经存在Leader。<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">for</span> (CompletableFuture&lt;VoteResponse&gt; future : quorumVoteResponses) &#123;</span><br><span class="line">   <span class="comment">// 省略部分代码</span></span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
Step5：遍历投票结果，收集投票结果，接下来重点看其内部实现。<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) &#123;</span><br><span class="line">    validNum.incrementAndGet();</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
Step6：如果投票结果不是UNKNOW，则有效投票数量增1。<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">synchronized</span> (knownMaxTermInGroup) &#123;</span><br><span class="line">    <span class="keyword">switch</span> (x.getVoteResult()) &#123;</span><br><span class="line">        <span class="keyword">case</span> ACCEPT:</span><br><span class="line">            acceptedNum.incrementAndGet();</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        <span class="keyword">case</span> REJECT_ALREADY_VOTED:</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        <span class="keyword">case</span> REJECT_ALREADY_HAS_LEADER:</span><br><span class="line">            alreadyHasLeader.compareAndSet(<span class="keyword">false</span>, <span class="keyword">true</span>);</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        <span class="keyword">case</span> REJECT_TERM_SMALL_THAN_LEDGER:</span><br><span class="line">        <span class="keyword">case</span> REJECT_EXPIRED_VOTE_TERM:</span><br><span class="line">            <span class="keyword">if</span> (x.getTerm() &gt; knownMaxTermInGroup.get()) &#123;</span><br><span class="line">                knownMaxTermInGroup.set(x.getTerm());</span><br><span class="line">            &#125;</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        <span class="keyword">case</span> REJECT_EXPIRED_LEDGER_TERM:</span><br><span class="line">        <span class="keyword">case</span> REJECT_SMALL_LEDGER_END_INDEX:</span><br><span class="line">            biggerLedgerNum.incrementAndGet();</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        <span class="keyword">case</span> REJECT_TERM_NOT_READY:</span><br><span class="line">            notReadyTermNum.incrementAndGet();</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">        <span class="keyword">default</span>:</span><br><span class="line">            <span class="keyword">break</span>;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
Step7：统计投票结构，几个关键点如下：</li>
<li>ACCEPT<br>赞成票，acceptedNum加一，只有得到的赞成票超过集群节点数量的一半才能成为Leader。</li>
<li>REJECT_ALREADY_VOTED<br>拒绝票，原因是已经投了其他节点的票。</li>
<li>REJECT_ALREADY_HAS_LEADER<br>拒绝票，原因是因为集群中已经存在Leaer了。alreadyHasLeader设置为true，无需在判断其他投票结果了，结束本轮投票。</li>
<li>REJECT_TERM_SMALL_THAN_LEDGER<br>拒绝票，如果自己维护的term小于远端维护的ledgerEndTerm，则返回该结果，如果对端的team大于自己的team，需要记录对端最大的投票轮次，以便更新自己的投票轮次。</li>
<li>REJECT_EXPIRED_VOTE_TERM<br>拒绝票，如果自己维护的term小于远端维护的term，更新自己维护的投票轮次。</li>
<li>REJECT_EXPIRED_LEDGER_TERM<br>拒绝票，如果自己维护的 ledgerTerm小于对端维护的ledgerTerm，则返回该结果。如果是此种情况，增加计数器- biggerLedgerNum的值。</li>
<li>REJECT_SMALL_LEDGER_END_INDEX<br>拒绝票，如果对端的ledgerTeam与自己维护的ledgerTeam相等，但是自己维护的dedgerEndIndex小于对端维护的值，返回该值，增加biggerLedgerNum计数器的值。</li>
<li>REJECT_TERM_NOT_READY<br>拒绝票，对端的投票轮次小于自己的team，则认为对端还未准备好投票，对端使用自己的投票轮次，是自己进入到Candidate状态。<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">try</span> &#123;</span><br><span class="line">    voteLatch.await(<span class="number">3000</span> + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);</span><br><span class="line">&#125; <span class="keyword">catch</span> (Throwable ignore) &#123;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
Step8：等待收集投票结果，并设置超时时间。<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br></pre></td><td class="code"><pre><span class="line">lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs);</span><br><span class="line">VoteResponse.ParseResult parseResult;</span><br><span class="line"><span class="keyword">if</span> (knownMaxTermInGroup.get() &gt; term) &#123;</span><br><span class="line">    parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;</span><br><span class="line">    nextTimeToRequestVote = getNextTimeToRequestVote();</span><br><span class="line">    changeRoleToCandidate(knownMaxTermInGroup.get());</span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (alreadyHasLeader.get()) &#123;</span><br><span class="line">    parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;</span><br><span class="line">    nextTimeToRequestVote = getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak;</span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (!memberState.isQuorum(validNum.get())) &#123;</span><br><span class="line">    parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;</span><br><span class="line">    nextTimeToRequestVote = getNextTimeToRequestVote();</span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (memberState.isQuorum(acceptedNum.get())) &#123;</span><br><span class="line">    parseResult = VoteResponse.ParseResult.PASSED;</span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) &#123;</span><br><span class="line">    parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;</span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (memberState.isQuorum(acceptedNum.get() + biggerLedgerNum.get())) &#123;</span><br><span class="line">    parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;</span><br><span class="line">    nextTimeToRequestVote = getNextTimeToRequestVote();</span><br><span class="line">&#125; <span class="keyword">else</span> &#123;</span><br><span class="line">    parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;</span><br><span class="line">    nextTimeToRequestVote = getNextTimeToRequestVote();</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
Step9：根据收集的投票结果判断是否能成为Leader。<blockquote>
<p>温馨提示：在讲解关键点之前，我们先定义先将（当前时间戳 + 上次投票的开销 + 最小投票间隔(300ms) +  （1000- 300 ）之间的随机值）定义为“ 1个常规计时器”。</p>
</blockquote>
</li>
</ul>
<p>其关键点如下：</p>
<ul>
<li>如果对端的投票轮次大于发起投票的节点，则该节点使用对端的轮次，重新进入到Candidate状态，并且重置投票计时器，其值为“1个常规计时器”</li>
<li>如果已经存在Leader，该节点重新进入到Candidate,并重置定时器，该定时器的时间： “1个常规计时器” + heartBeatTimeIntervalMs * maxHeartBeatLeak ，其中 heartBeatTimeIntervalMs 为一次心跳间隔时间，<br>maxHeartBeatLeak 为  允许最大丢失的心跳包，即如果Flower节点在多少个心跳周期内未收到心跳包，则认为Leader已下线。</li>
<li>如果收到的有效票数未超过半数，则重置计时器为“ 1个常规计时器”，然后等待重新投票，注意状态为WAIT_TO_REVOTE，该状态下的特征是下次投票时不增加投票轮次。</li>
<li>如果得到的赞同票超过半数，则成为Leader。</li>
<li>如果得到的赞成票加上未准备投票的节点数超过半数，则应该立即发起投票，故其结果为REVOTE_IMMEDIATELY。</li>
<li>如果得到的赞成票加上对端维护的ledgerEndIndex超过半数，则重置计时器，继续本轮次的选举。</li>
<li>其他情况，开启下一轮投票。<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (parseResult == VoteResponse.ParseResult.PASSED) &#123;</span><br><span class="line">    logger.info(<span class="string">&quot;[&#123;&#125;] [VOTE_RESULT] has been elected to be the leader in term &#123;&#125;&quot;</span>, memberState.getSelfId(), term);</span><br><span class="line">    changeRoleToLeader(term);</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
Step10：如果投票成功，则状态机状态设置为Leader，然后状态管理在驱动状态时会调用DLedgerLeaderElector#maintainState时，将进入到maintainAsLeader方法。</li>
</ul>
<h4 id="2-3-2-maintainAsLeader-方法"><a href="#2-3-2-maintainAsLeader-方法" class="headerlink" title="2.3.2  maintainAsLeader 方法"></a>2.3.2  maintainAsLeader 方法</h4><p>经过maintainAsCandidate 投票选举后，被其他节点选举成为领导后，会执行该方法，其他节点的状态还是Candidate，并在计时器过期后，又尝试去发起选举。接下来重点分析成为Leader节点后，该节点会做些什么？</p>
<p>DLedgerLeaderElector#maintainAsLeader</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">maintainAsLeader</span><span class="params">()</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">    <span class="keyword">if</span> (DLedgerUtils.elapsed(lastSendHeartBeatTime) &gt; heartBeatTimeIntervalMs) &#123;  <span class="comment">// @1</span></span><br><span class="line">        <span class="keyword">long</span> term;</span><br><span class="line">        String leaderId;</span><br><span class="line">        <span class="keyword">synchronized</span> (memberState) &#123;</span><br><span class="line">            <span class="keyword">if</span> (!memberState.isLeader()) &#123;     <span class="comment">// @2</span></span><br><span class="line">                <span class="comment">//stop sending</span></span><br><span class="line">                <span class="keyword">return</span>;</span><br><span class="line">            &#125;</span><br><span class="line">            term = memberState.currTerm();</span><br><span class="line">            leaderId = memberState.getLeaderId();</span><br><span class="line">            lastSendHeartBeatTime = System.currentTimeMillis();    <span class="comment">// @3</span></span><br><span class="line">        &#125;</span><br><span class="line">        sendHeartbeats(term, leaderId);    <span class="comment">// @4</span></span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：首先判断上一次发送心跳的时间与当前时间的差值是否大于心跳包发送间隔，如果超过，则说明需要发送心跳包。</p>
<p>代码@2：如果当前不是leader节点，则直接返回，主要是为了二次判断。</p>
<p>代码@3：重置心跳包发送计时器。</p>
<p>代码@4：向集群内的所有节点发送心跳包，稍后会详细介绍心跳包的发送。</p>
<h4 id="2-3-3-maintainAsFollower方法"><a href="#2-3-3-maintainAsFollower方法" class="headerlink" title="2.3.3  maintainAsFollower方法"></a>2.3.3  maintainAsFollower方法</h4><p>当 Candidate 状态的节点在收到主节点发送的心跳包后，会将状态变更为follower，那我们先来看一下在follower状态下，节点会做些什么事情？</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br></pre></td><td class="code"><pre><span class="line"><span class="function"><span class="keyword">private</span> <span class="keyword">void</span> <span class="title">maintainAsFollower</span><span class="params">()</span> </span>&#123;</span><br><span class="line">    <span class="keyword">if</span> (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) &gt; <span class="number">2</span> * heartBeatTimeIntervalMs) &#123;   </span><br><span class="line">        <span class="keyword">synchronized</span> (memberState) &#123;</span><br><span class="line">            <span class="keyword">if</span> (memberState.isFollower() &amp;&amp; (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) &gt; maxHeartBeatLeak * heartBeatTimeIntervalMs)) &#123;</span><br><span class="line">                logger.info(<span class="string">&quot;[&#123;&#125;][HeartBeatTimeOut] lastLeaderHeartBeatTime: &#123;&#125; heartBeatTimeIntervalMs: &#123;&#125; lastLeader=&#123;&#125;&quot;</span>, memberState.getSelfId(), <span class="keyword">new</span> Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId());</span><br><span class="line">                changeRoleToCandidate(memberState.currTerm());</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>如果maxHeartBeatLeak (默认为3)个心跳包周期内未收到心跳，则将状态变更为Candidate。</p>
<p>状态机的驱动就介绍到这里，在上面的流程中，其实我们忽略了两个重要的过程，一个是发起投票请求与投票请求响应、发送心跳包与心跳包响应，那我们接下来将重点介绍这两个过程。</p>
<h3 id="2-4-投票与投票请求"><a href="#2-4-投票与投票请求" class="headerlink" title="2.4 投票与投票请求"></a>2.4 投票与投票请求</h3><p>节点的状态为 Candidate 时会向集群内的其他节点发起投票请求(个人觉得理解为拉票更好)，向对方询问是否愿意选举我为Leader，对端节点会根据自己的情况对其投赞成票、拒绝票，如果是拒绝票，还会给出拒绝原因，具体由voteForQuorumResponses、handleVote 这两个方法来实现，接下来我们分别对这两个方法进行详细分析。</p>
<h4 id="2-4-1-voteForQuorumResponses"><a href="#2-4-1-voteForQuorumResponses" class="headerlink" title="2.4.1 voteForQuorumResponses"></a>2.4.1 voteForQuorumResponses</h4><p>发起投票请求。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">private</span> List&lt;CompletableFuture&lt;VoteResponse&gt;&gt; voteForQuorumResponses(<span class="keyword">long</span> term, <span class="keyword">long</span> ledgerEndTerm,</span><br><span class="line">    <span class="keyword">long</span> ledgerEndIndex) <span class="keyword">throws</span> Exception &#123;   <span class="comment">// @1</span></span><br><span class="line">    List&lt;CompletableFuture&lt;VoteResponse&gt;&gt; responses = <span class="keyword">new</span> ArrayList&lt;&gt;();</span><br><span class="line">    <span class="keyword">for</span> (String id : memberState.getPeerMap().keySet()) &#123;               <span class="comment">// @2</span></span><br><span class="line">        VoteRequest voteRequest = <span class="keyword">new</span> VoteRequest();                  <span class="comment">// @3 start</span></span><br><span class="line">        voteRequest.setGroup(memberState.getGroup());</span><br><span class="line">        voteRequest.setLedgerEndIndex(ledgerEndIndex);</span><br><span class="line">        voteRequest.setLedgerEndTerm(ledgerEndTerm);</span><br><span class="line">        voteRequest.setLeaderId(memberState.getSelfId());</span><br><span class="line">        voteRequest.setTerm(term);</span><br><span class="line">        voteRequest.setRemoteId(id);</span><br><span class="line">        CompletableFuture&lt;VoteResponse&gt; voteResponse;          <span class="comment">// @3 end</span></span><br><span class="line">        <span class="keyword">if</span> (memberState.getSelfId().equals(id)) &#123;                             <span class="comment">// @4</span></span><br><span class="line">            voteResponse = handleVote(voteRequest, <span class="keyword">true</span>);</span><br><span class="line">        &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">            <span class="comment">//async</span></span><br><span class="line">            voteResponse = dLedgerRpcService.vote(voteRequest);  <span class="comment">// @5</span></span><br><span class="line">        &#125;</span><br><span class="line">        responses.add(voteResponse);</span><br><span class="line">    &#125;</span><br><span class="line">    <span class="keyword">return</span> responses;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>代码@1：首先先解释一下参数的含义：</p>
<ul>
<li>long term<br>发起投票的节点当前的投票轮次。</li>
<li>long ledgerEndTerm<br>发起投票节点维护的已知的最大投票轮次。</li>
<li>long ledgerEndIndex<br>发起投票节点维护的已知的最大日志条目索引。</li>
</ul>
<p>代码@2：遍历集群内的节点集合，准备异步发起投票请求。这个集合在启动的时候指定，不能修改。</p>
<p>代码@3：构建投票请求。</p>
<p>代码@4：如果是发送给自己的，则直接调用handleVote进行投票请求响应，如果是发送给集群内的其他节点，则通过网络发送投票请求，对端节点调用各自的handleVote对集群进行响应。</p>
<p>接下来重点关注 handleVote 方法，重点探讨其投票处理逻辑。</p>
<h4 id="2-4-2-handleVote-方法"><a href="#2-4-2-handleVote-方法" class="headerlink" title="2.4.2 handleVote 方法"></a>2.4.2 handleVote 方法</h4><p>由于handleVote 方法会并发被调用，因为可能同时收到多个节点的投票请求，故本方法都被synchronized方法包含，锁定的对象为状态机 memberState 对象。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (!memberState.isPeerMember(request.getLeaderId())) &#123;</span><br><span class="line">    logger.warn(<span class="string">&quot;[BUG] [HandleVote] remoteId=&#123;&#125; is an unknown member&quot;</span>, request.getLeaderId());</span><br><span class="line">    <span class="keyword">return</span> CompletableFuture.completedFuture(newVoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNKNOWN_LEADER));</span><br><span class="line">&#125;</span><br><span class="line"><span class="keyword">if</span> (!self &amp;&amp; memberState.getSelfId().equals(request.getLeaderId())) &#123;</span><br><span class="line">    logger.warn(<span class="string">&quot;[BUG] [HandleVote] selfId=&#123;&#125; but remoteId=&#123;&#125;&quot;</span>, memberState.getSelfId(), request.getLeaderId());</span><br><span class="line">    <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_UNEXPECTED_LEADER));</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step1：为了逻辑的完整性对其请求进行检验，除非有BUG存在，否则是不会出现上述问题的。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (request.getTerm() &lt; memberState.currTerm()) &#123;    <span class="comment">// @1</span></span><br><span class="line">    <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM));</span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (request.getTerm() == memberState.currTerm()) &#123;   <span class="comment">// @2</span></span><br><span class="line">    <span class="keyword">if</span> (memberState.currVoteFor() == <span class="keyword">null</span>) &#123;</span><br><span class="line">        <span class="comment">//let it go</span></span><br><span class="line">    &#125; <span class="keyword">else</span> <span class="keyword">if</span> (memberState.currVoteFor().equals(request.getLeaderId())) &#123;</span><br><span class="line">         <span class="comment">//repeat just let it go</span></span><br><span class="line">    &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">        <span class="keyword">if</span> (memberState.getLeaderId() != <span class="keyword">null</span>) &#123;</span><br><span class="line">             <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY__HAS_LEADER));</span><br><span class="line">        &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">                <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED));</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;</span><br><span class="line">&#125; <span class="keyword">else</span> &#123;            <span class="comment">// @3</span></span><br><span class="line">    <span class="comment">//stepped down by larger term</span></span><br><span class="line">    changeRoleToCandidate(request.getTerm());</span><br><span class="line">    needIncreaseTermImmediately = <span class="keyword">true</span>;</span><br><span class="line">    <span class="comment">//only can handleVote when the term is consistent</span></span><br><span class="line">    <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY));</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step2：判断发起节点、响应节点维护的team进行投票“仲裁”，分如下3种情况讨论：</p>
<ul>
<li>如果发起投票节点的 term 小于当前节点的 term<br>此种情况下投拒绝票，也就是说在 raft 协议的世界中，谁的 term 越大，越有话语权。 </li>
<li>如果发起投票节点的 term 等于当前节点的 term<br>如果两者的 term 相等，说明两者都处在同一个投票轮次中，地位平等，接下来看该节点是否已经投过票。<ul>
<li>如果未投票、或已投票给请求节点，则继续后面的逻辑（请看step3）。</li>
<li>如果该节点已存在的Leader节点，则拒绝并告知已存在Leader节点。</li>
<li>如果该节点还未有Leader节点，但已经投了其他节点的票，则拒绝请求节点，并告知已投票。</li>
</ul>
</li>
<li>如果发起投票节点的 term 大于当前节点的 term<br>拒绝请求节点的投票请求，并告知自身还未准备投票，自身会使用请求节点的投票轮次立即进入到Candidate状态。<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (request.getLedgerEndTerm() &lt; memberState.getLedgerEndTerm()) &#123;</span><br><span class="line">    <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM));</span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() &amp;&amp; request.getLedgerEndIndex() &lt; memberState.getLedgerEndIndex()) &#123;</span><br><span class="line">    <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX));</span><br><span class="line">&#125;</span><br><span class="line"></span><br><span class="line"><span class="keyword">if</span> (request.getTerm() &lt; memberState.getLedgerEndTerm()) &#123;</span><br><span class="line">    <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> VoteResponse(request).term(memberState.getLedgerEndTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_SMALL_THAN_LEDGER));</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
Step3：判断请求节点的 ledgerEndTerm 与当前节点的 ledgerEndTerm，这里主要是判断日志的复制进度。</li>
<li>如果请求节点的 ledgerEndTerm  小于当前节点的 ledgerEndTerm 则拒绝，其原因是请求节点的日志复制进度比当前节点低，这种情况是不能成为主节点的。</li>
<li>如果 ledgerEndTerm  相等，但是 ledgerEndIndex 比当前节点小，则拒绝，原因与上一条相同。</li>
<li>如果请求的 term 小于 ledgerEndTerm 以同样的理由拒绝。</li>
</ul>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br></pre></td><td class="code"><pre><span class="line">memberState.setCurrVoteFor(request.getLeaderId());</span><br><span class="line"><span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.ACCEPT));</span><br></pre></td></tr></table></figure>
<p>Step4：经过层层条件帅选，将宝贵的赞成票投给请求节点。</p>
<p>经过几轮投票，最终一个节点能成功被推举出来，选为主节点。主节点为了维持其领导地位，需要定时向从节点发送心跳包，接下来我们重点看一下心跳包的发送与响应。</p>
<h3 id="2-5-心跳包与心跳包响应"><a href="#2-5-心跳包与心跳包响应" class="headerlink" title="2.5 心跳包与心跳包响应"></a>2.5 心跳包与心跳包响应</h3><h4 id="2-5-1-sendHeartbeats"><a href="#2-5-1-sendHeartbeats" class="headerlink" title="2.5.1 sendHeartbeats"></a>2.5.1 sendHeartbeats</h4><p>Step1：遍历集群中的节点，异步发送心跳包。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br></pre></td><td class="code"><pre><span class="line"> CompletableFuture&lt;HeartBeatResponse&gt; future = dLedgerRpcService.heartBeat(heartBeatRequest);</span><br><span class="line">    future.whenComplete((HeartBeatResponse x, Throwable ex) -&gt; &#123;</span><br><span class="line">        <span class="keyword">try</span> &#123;</span><br><span class="line"></span><br><span class="line">            <span class="keyword">if</span> (ex != <span class="keyword">null</span>) &#123;</span><br><span class="line">                <span class="keyword">throw</span> ex;</span><br><span class="line">            &#125;</span><br><span class="line">            <span class="keyword">switch</span> (DLedgerResponseCode.valueOf(x.getCode())) &#123;</span><br><span class="line">                <span class="keyword">case</span> SUCCESS:</span><br><span class="line">                    succNum.incrementAndGet();</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">                <span class="keyword">case</span> EXPIRED_TERM:</span><br><span class="line">                    maxTerm.set(x.getTerm());</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">                <span class="keyword">case</span> INCONSISTENT_LEADER:</span><br><span class="line">                    inconsistLeader.compareAndSet(<span class="keyword">false</span>, <span class="keyword">true</span>);</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">                <span class="keyword">case</span> TERM_NOT_READY:</span><br><span class="line">                    notReadyNum.incrementAndGet();</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">                <span class="keyword">default</span>:</span><br><span class="line">                    <span class="keyword">break</span>;</span><br><span class="line">            &#125;</span><br><span class="line">            <span class="keyword">if</span> (memberState.isQuorum(succNum.get())</span><br><span class="line">                || memberState.isQuorum(succNum.get() + notReadyNum.get())) &#123;</span><br><span class="line">                beatLatch.countDown();</span><br><span class="line">            &#125;</span><br><span class="line">        &#125; <span class="keyword">catch</span> (Throwable t) &#123;</span><br><span class="line">            logger.error(<span class="string">&quot;Parse heartbeat response failed&quot;</span>, t);</span><br><span class="line">        &#125; <span class="keyword">finally</span> &#123;</span><br><span class="line">            allNum.incrementAndGet();</span><br><span class="line">            <span class="keyword">if</span> (allNum.get() == memberState.peerSize()) &#123;</span><br><span class="line">                beatLatch.countDown();</span><br><span class="line">            &#125;</span><br><span class="line">        &#125;</span><br><span class="line">    &#125;);</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step2：统计心跳包发送响应结果，关键点如下：</p>
<ul>
<li>SUCCESS<br>心跳包成功响应。</li>
<li>EXPIRED_TERM<br>主节点的投票 term 小于从节点的投票轮次。</li>
<li>INCONSISTENT_LEADER<br>从节点已经有了新的主节点。</li>
<li>TERM_NOT_READY<br>从节点未准备好。</li>
</ul>
<p>这些响应值，我们在处理心跳包时重点探讨。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br></pre></td><td class="code"><pre><span class="line">beatLatch.await(heartBeatTimeIntervalMs, TimeUnit.MILLISECONDS);</span><br><span class="line"><span class="keyword">if</span> (memberState.isQuorum(succNum.get())) &#123;   <span class="comment">// @1</span></span><br><span class="line">    lastSuccHeartBeatTime = System.currentTimeMillis();</span><br><span class="line">&#125; <span class="keyword">else</span> &#123;</span><br><span class="line">    logger.info(<span class="string">&quot;[&#123;&#125;] Parse heartbeat responses in cost=&#123;&#125; term=&#123;&#125; allNum=&#123;&#125; succNum=&#123;&#125; notReadyNum=&#123;&#125; inconsistLeader=&#123;&#125; maxTerm=&#123;&#125; peerSize=&#123;&#125; lastSuccHeartBeatTime=&#123;&#125;&quot;</span>,</span><br><span class="line">                memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), <span class="keyword">new</span> Timestamp(lastSuccHeartBeatTime));</span><br><span class="line">    <span class="keyword">if</span> (memberState.isQuorum(succNum.get() + notReadyNum.get())) &#123;    <span class="comment">// @2</span></span><br><span class="line">        lastSendHeartBeatTime = -<span class="number">1</span>;</span><br><span class="line">    &#125; <span class="keyword">else</span> <span class="keyword">if</span> (maxTerm.get() &gt; term) &#123;                                                          <span class="comment">// @3</span></span><br><span class="line">        changeRoleToCandidate(maxTerm.get());</span><br><span class="line">    &#125; <span class="keyword">else</span> <span class="keyword">if</span> (inconsistLeader.get()) &#123;                                                            <span class="comment">// @4</span></span><br><span class="line">        changeRoleToCandidate(term);</span><br><span class="line">    &#125; <span class="keyword">else</span> <span class="keyword">if</span> (DLedgerUtils.elapsed(lastSuccHeartBeatTime) &gt; maxHeartBeatLeak * heartBeatTimeIntervalMs) &#123;</span><br><span class="line">        changeRoleToCandidate(term);</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>对收集的响应结果做仲裁，其实现关键点：</p>
<ul>
<li>如果成功的票数大于进群内的半数，则表示集群状态正常，正常按照心跳包间隔发送心跳包(见代码@1)。</li>
<li>如果成功的票数加上未准备的投票的节点数量超过集群内的半数，则立即发送心跳包(见代码@2)。</li>
<li>如果从节点的投票轮次比主节点的大，则使用从节点的投票轮次，或从节点已经有了另外的主节点，节点状态从 Leader 转换为 Candidate。</li>
</ul>
<p>接下来我们重点看一下心跳包的处理逻辑。</p>
<h4 id="2-5-2-handleHeartBeat"><a href="#2-5-2-handleHeartBeat" class="headerlink" title="2.5.2 handleHeartBeat"></a>2.5.2 handleHeartBeat</h4><figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">if</span> (request.getTerm() &lt; memberState.currTerm()) &#123;</span><br><span class="line">    <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));</span><br><span class="line">&#125; <span class="keyword">else</span> <span class="keyword">if</span> (request.getTerm() == memberState.currTerm()) &#123;</span><br><span class="line">    <span class="keyword">if</span> (request.getLeaderId().equals(memberState.getLeaderId())) &#123;</span><br><span class="line">        lastLeaderHeartBeatTime = System.currentTimeMillis();</span><br><span class="line">        <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> HeartBeatResponse());</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step1：如果主节点的 term 小于 从节点的term，发送反馈给主节点，告知主节点的 term 已过时；如果投票轮次相同，并且发送心跳包的节点是该节点的主节点，则返回成功。</p>
<p>下面重点讨论主节点的 term 大于从节点的情况。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">synchronized</span> (memberState) &#123;</span><br><span class="line">    <span class="keyword">if</span> (request.getTerm() &lt; memberState.currTerm()) &#123;   <span class="comment">// @1</span></span><br><span class="line">        <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));</span><br><span class="line">    &#125; <span class="keyword">else</span> <span class="keyword">if</span> (request.getTerm() == memberState.currTerm()) &#123;  <span class="comment">// @2</span></span><br><span class="line">        <span class="keyword">if</span> (memberState.getLeaderId() == <span class="keyword">null</span>) &#123;</span><br><span class="line">            changeRoleToFollower(request.getTerm(), request.getLeaderId());</span><br><span class="line">            <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> HeartBeatResponse());</span><br><span class="line">        &#125; <span class="keyword">else</span> <span class="keyword">if</span> (request.getLeaderId().equals(memberState.getLeaderId())) &#123;</span><br><span class="line">            lastLeaderHeartBeatTime = System.currentTimeMillis();</span><br><span class="line">            <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> HeartBeatResponse());</span><br><span class="line">        &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">            <span class="comment">//this should not happen, but if happened</span></span><br><span class="line">            logger.error(<span class="string">&quot;[&#123;&#125;][BUG] currTerm &#123;&#125; has leader &#123;&#125;, but received leader &#123;&#125;&quot;</span>, memberState.getSelfId(), memberState.currTerm(), memberState.getLeaderId(), request.getLeaderId());</span><br><span class="line">            <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> HeartBeatResponse().code(DLedgerResponseCode.INCONSISTENT_LEADER.getCode()));</span><br><span class="line">        &#125;</span><br><span class="line">    &#125; <span class="keyword">else</span> &#123;</span><br><span class="line">        <span class="comment">//To make it simple, for larger term, do not change to follower immediately</span></span><br><span class="line">        <span class="comment">//first change to candidate, and notify the state-maintainer thread</span></span><br><span class="line">        changeRoleToCandidate(request.getTerm());</span><br><span class="line">        needIncreaseTermImmediately = <span class="keyword">true</span>;</span><br><span class="line">        <span class="comment">//TOOD notify</span></span><br><span class="line">        <span class="keyword">return</span> CompletableFuture.completedFuture(<span class="keyword">new</span> HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode()));</span><br><span class="line">    &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>Step2：加锁来处理（这里更多的是从节点第一次收到主节点的心跳包）</p>
<p>代码@1：如果主节的投票轮次小于当前投票轮次，则返回主节点投票轮次过期。</p>
<p>代码@2：如果投票轮次相同。</p>
<ul>
<li>如果当前节点的主节点字段为空，则使用主节点的ID，并返回成功。</li>
<li>如果当前节点的主节点就是发送心跳包的节点，则更新上一次收到心跳包的时间戳，并返回成功。</li>
<li>如果从节点的主节点与发送心跳包的节点ID不同，说明有另外一个Leaer，按道理来说是不会发送的，如果发生，则返回已存在- 主节点，标记该心跳包处理结束。</li>
</ul>
<p>代码@3：如果主节点的投票轮次大于从节点的投票轮次，则认为从节点并为准备好，则从节点进入Candidate 状态，并立即发起一次投票。</p>
<p>心跳包的处理就介绍到这里。</p>
<p>RocketMQ 多副本之 Leader 选举的源码分析就介绍到这里了，为了加强对源码的理解，先梳理流程图如下：<br><img src="https://img-blog.csdnimg.cn/20190817204737273.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70" alt="在这里插入图片描述"></p>
</div>

			<script src="https://my.openwrite.cn/js/readmore.js" type="text/javascript"></script>
			<script>
			var isMobile = navigator.userAgent.match(/(phone|pad|pod|iPhone|iPod|ios|iPad|Android|Mobile|BlackBerry|IEMobile|MQQBrowser|JUC|Fennec|wOSBrowser|BrowserNG|WebOS|Symbian|Windows Phone)/i);
			if (!isMobile) {
			    var btw = new BTWPlugin();
			    btw.init({
			        "id": "vip-container",
			        "blogId": "18019-1573088808868-542",
			        "name": "中间件兴趣圈",
			        "qrcode": "https://img-blog.csdnimg.cn/20190314214003962.jpg",
			        "keyword": "more"
			    });
			}
			</script>
		
      
    </div>
    
    
    

    

    

    

    <footer class="post-footer">
      

      
      
      

      
        <div class="post-nav">
          <div class="post-nav-next post-nav-item">
            
          </div>

          <span class="post-nav-divider"></span>

          <div class="post-nav-prev post-nav-item">
            
              <a href="/posts/bfa9fca1.html" rel="prev" title="探究java8流收集数据原理">
                探究java8流收集数据原理 <i class="fa fa-chevron-right"></i>
              </a>
            
          </div>
        </div>
      

      
      
    </footer>
  </div>
  
  
  
  </article>



    <div class="post-spread">
      
    </div>
  </div>


          </div>
          


          

  



        </div>
        
          
  
  <div class="sidebar-toggle">
    <div class="sidebar-toggle-line-wrap">
      <span class="sidebar-toggle-line sidebar-toggle-line-first"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-middle"></span>
      <span class="sidebar-toggle-line sidebar-toggle-line-last"></span>
    </div>
  </div>

  <aside id="sidebar" class="sidebar">
    
    <div class="sidebar-inner">

      

      
        <ul class="sidebar-nav motion-element">
          <li class="sidebar-nav-toc sidebar-nav-active" data-target="post-toc-wrap">
            文章目录
          </li>
          <li class="sidebar-nav-overview" data-target="site-overview-wrap">
            站点概览
          </li>
        </ul>
      

      <section class="site-overview-wrap sidebar-panel">
        <div class="site-overview">
          <div class="site-author motion-element" itemprop="author" itemscope itemtype="http://schema.org/Person">
            
              <p class="site-author-name" itemprop="name"></p>
              <p class="site-description motion-element" itemprop="description"></p>
          </div>

          <nav class="site-state motion-element">

            
              <div class="site-state-item site-state-posts">
              
                <a href="/archives/">
              
                  <span class="site-state-item-count">139</span>
                  <span class="site-state-item-name">日志</span>
                </a>
              </div>
            

            
              
              
              <div class="site-state-item site-state-categories">
                <a href="/categories/index.html">
                  <span class="site-state-item-count">18</span>
                  <span class="site-state-item-name">分类</span>
                </a>
              </div>
            

            

          </nav>

          

          

          
          

          
          

          

        </div>
      </section>

      
      <!--noindex-->
        <section class="post-toc-wrap motion-element sidebar-panel sidebar-panel-active">
          <div class="post-toc">

            
              
            

            
              <div class="post-toc-content"><ol class="nav"><li class="nav-item nav-level-2"><a class="nav-link" href="#1%E3%80%81DLedger%E5%85%B3%E4%BA%8E%E9%80%89%E4%B8%BB%E7%9A%84%E6%A0%B8%E5%BF%83%E7%B1%BB%E5%9B%BE"><span class="nav-number">1.</span> <span class="nav-text">1、DLedger关于选主的核心类图</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#1-1-DLedgerConfig"><span class="nav-number">1.1.</span> <span class="nav-text">1.1 DLedgerConfig</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#1-2-MemberState"><span class="nav-number">1.2.</span> <span class="nav-text">1.2 MemberState</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#1-3-raft%E5%8D%8F%E8%AE%AE%E7%9B%B8%E5%85%B3"><span class="nav-number">1.3.</span> <span class="nav-text">1.3 raft协议相关</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#1-3-1-DLedgerClientProtocol"><span class="nav-number">1.3.1.</span> <span class="nav-text">1.3.1 DLedgerClientProtocol</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#1-3-2-DLedgerProtocol"><span class="nav-number">1.3.2.</span> <span class="nav-text">1.3.2 DLedgerProtocol</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#1-3-3-%E5%8D%8F%E8%AE%AE%E5%A4%84%E7%90%86Handler"><span class="nav-number">1.3.3.</span> <span class="nav-text">1.3.3 协议处理Handler</span></a></li></ol></li><li class="nav-item nav-level-3"><a class="nav-link" href="#1-4-DLedgerRpcService"><span class="nav-number">1.4.</span> <span class="nav-text">1.4 DLedgerRpcService</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#1-5-DLedgerLeaderElector"><span class="nav-number">1.5.</span> <span class="nav-text">1.5 DLedgerLeaderElector</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#1-6-DLedgerServer"><span class="nav-number">1.6.</span> <span class="nav-text">1.6 DLedgerServer</span></a></li></ol></li><li class="nav-item nav-level-2"><a class="nav-link" href="#2%E3%80%81%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90Leader%E9%80%89%E4%B8%BE"><span class="nav-number">2.</span> <span class="nav-text">2、源码分析Leader选举</span></a><ol class="nav-child"><li class="nav-item nav-level-3"><a class="nav-link" href="#2-1-DLedgerLeaderElector-%E7%B1%BB%E5%9B%BE"><span class="nav-number">2.1.</span> <span class="nav-text">2.1 DLedgerLeaderElector 类图</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#2-2-%E5%90%AF%E5%8A%A8%E9%80%89%E4%B8%BE%E7%8A%B6%E6%80%81%E7%AE%A1%E7%90%86%E5%99%A8"><span class="nav-number">2.2.</span> <span class="nav-text">2.2 启动选举状态管理器</span></a></li><li class="nav-item nav-level-3"><a class="nav-link" href="#2-3-%E9%80%89%E4%B8%BE%E7%8A%B6%E6%80%81%E6%9C%BA%E7%8A%B6%E6%80%81%E6%B5%81%E8%BD%AC"><span class="nav-number">2.3.</span> <span class="nav-text">2.3 选举状态机状态流转</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#2-3-1-maintainAsCandidate-%E6%96%B9%E6%B3%95"><span class="nav-number">2.3.1.</span> <span class="nav-text">2.3.1  maintainAsCandidate 方法</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#2-3-2-maintainAsLeader-%E6%96%B9%E6%B3%95"><span class="nav-number">2.3.2.</span> <span class="nav-text">2.3.2  maintainAsLeader 方法</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#2-3-3-maintainAsFollower%E6%96%B9%E6%B3%95"><span class="nav-number">2.3.3.</span> <span class="nav-text">2.3.3  maintainAsFollower方法</span></a></li></ol></li><li class="nav-item nav-level-3"><a class="nav-link" href="#2-4-%E6%8A%95%E7%A5%A8%E4%B8%8E%E6%8A%95%E7%A5%A8%E8%AF%B7%E6%B1%82"><span class="nav-number">2.4.</span> <span class="nav-text">2.4 投票与投票请求</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#2-4-1-voteForQuorumResponses"><span class="nav-number">2.4.1.</span> <span class="nav-text">2.4.1 voteForQuorumResponses</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#2-4-2-handleVote-%E6%96%B9%E6%B3%95"><span class="nav-number">2.4.2.</span> <span class="nav-text">2.4.2 handleVote 方法</span></a></li></ol></li><li class="nav-item nav-level-3"><a class="nav-link" href="#2-5-%E5%BF%83%E8%B7%B3%E5%8C%85%E4%B8%8E%E5%BF%83%E8%B7%B3%E5%8C%85%E5%93%8D%E5%BA%94"><span class="nav-number">2.5.</span> <span class="nav-text">2.5 心跳包与心跳包响应</span></a><ol class="nav-child"><li class="nav-item nav-level-4"><a class="nav-link" href="#2-5-1-sendHeartbeats"><span class="nav-number">2.5.1.</span> <span class="nav-text">2.5.1 sendHeartbeats</span></a></li><li class="nav-item nav-level-4"><a class="nav-link" href="#2-5-2-handleHeartBeat"><span class="nav-number">2.5.2.</span> <span class="nav-text">2.5.2 handleHeartBeat</span></a></li></ol></li></ol></li></ol></div>
            

          </div>
        </section>
      <!--/noindex-->
      

      

    </div>
  </aside>


        
      </div>
    </main>

    <footer id="footer" class="footer">
      <div class="footer-inner">
        <div class="copyright">&copy; <span itemprop="copyrightYear">2021</span>
  <span class="with-love">
    <i class="fa fa-user"></i>
  </span>
  <span class="author" itemprop="copyrightHolder">中间件兴趣圈</span>

  
</div>


  <div class="powered-by">由 <a class="theme-link" target="_blank" href="https://hexo.io">Hexo</a> 强力驱动</div>



  <span class="post-meta-divider">|</span>



  <div class="theme-info">主题 &mdash; <a class="theme-link" target="_blank" href="https://github.com/iissnan/hexo-theme-next">NexT.Muse</a> v5.1.4</div>




        
<div class="busuanzi-count">
  <script async src="https://dn-lbstatics.qbox.me/busuanzi/2.3/busuanzi.pure.mini.js"></script>

  
    <span class="site-uv">
      <i class="fa fa-user"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_uv"></span>
      
    </span>
  

  
    <span class="site-pv">
      <i class="fa fa-eye"></i>
      <span class="busuanzi-value" id="busuanzi_value_site_pv"></span>
      
    </span>
  
</div>








        
      </div>
    </footer>

    
      <div class="back-to-top">
        <i class="fa fa-arrow-up"></i>
        
      </div>
    

    

  </div>

  

<script type="text/javascript">
  if (Object.prototype.toString.call(window.Promise) !== '[object Function]') {
    window.Promise = null;
  }
</script>









  












  
  
    <script type="text/javascript" src="/lib/jquery/index.js?v=2.1.3"></script>
  

  
  
    <script type="text/javascript" src="/lib/fastclick/lib/fastclick.min.js?v=1.0.6"></script>
  

  
  
    <script type="text/javascript" src="/lib/jquery_lazyload/jquery.lazyload.js?v=1.9.7"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/velocity/velocity.ui.min.js?v=1.2.1"></script>
  

  
  
    <script type="text/javascript" src="/lib/fancybox/source/jquery.fancybox.pack.js?v=2.1.5"></script>
  


  


  <script type="text/javascript" src="/js/src/utils.js?v=5.1.4"></script>

  <script type="text/javascript" src="/js/src/motion.js?v=5.1.4"></script>



  
  

  
  <script type="text/javascript" src="/js/src/scrollspy.js?v=5.1.4"></script>
<script type="text/javascript" src="/js/src/post-details.js?v=5.1.4"></script>



  


  <script type="text/javascript" src="/js/src/bootstrap.js?v=5.1.4"></script>



  


  




	





  





  












  





  

  
  <script src="https://cdn1.lncld.net/static/js/av-core-mini-0.6.4.js"></script>
  <script>AV.initialize("NNEhOL0iOcflg8f1U3HUqiCq-gzGzoHsz", "7kSmkbbb3DktmHALlShDsBUF");</script>
  <script>
    function showTime(Counter) {
      var query = new AV.Query(Counter);
      var entries = [];
      var $visitors = $(".leancloud_visitors");

      $visitors.each(function () {
        entries.push( $(this).attr("id").trim() );
      });

      query.containedIn('url', entries);
      query.find()
        .done(function (results) {
          var COUNT_CONTAINER_REF = '.leancloud-visitors-count';

          if (results.length === 0) {
            $visitors.find(COUNT_CONTAINER_REF).text(0);
            return;
          }

          for (var i = 0; i < results.length; i++) {
            var item = results[i];
            var url = item.get('url');
            var time = item.get('time');
            var element = document.getElementById(url);

            $(element).find(COUNT_CONTAINER_REF).text(time);
          }
          for(var i = 0; i < entries.length; i++) {
            var url = entries[i];
            var element = document.getElementById(url);
            var countSpan = $(element).find(COUNT_CONTAINER_REF);
            if( countSpan.text() == '') {
              countSpan.text(0);
            }
          }
        })
        .fail(function (object, error) {
          console.log("Error: " + error.code + " " + error.message);
        });
    }

    function addCount(Counter) {
      var $visitors = $(".leancloud_visitors");
      var url = $visitors.attr('id').trim();
      var title = $visitors.attr('data-flag-title').trim();
      var query = new AV.Query(Counter);

      query.equalTo("url", url);
      query.find({
        success: function(results) {
          if (results.length > 0) {
            var counter = results[0];
            counter.fetchWhenSave(true);
            counter.increment("time");
            counter.save(null, {
              success: function(counter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(counter.get('time'));
              },
              error: function(counter, error) {
                console.log('Failed to save Visitor num, with error message: ' + error.message);
              }
            });
          } else {
            var newcounter = new Counter();
            /* Set ACL */
            var acl = new AV.ACL();
            acl.setPublicReadAccess(true);
            acl.setPublicWriteAccess(true);
            newcounter.setACL(acl);
            /* End Set ACL */
            newcounter.set("title", title);
            newcounter.set("url", url);
            newcounter.set("time", 1);
            newcounter.save(null, {
              success: function(newcounter) {
                var $element = $(document.getElementById(url));
                $element.find('.leancloud-visitors-count').text(newcounter.get('time'));
              },
              error: function(newcounter, error) {
                console.log('Failed to create');
              }
            });
          }
        },
        error: function(error) {
          console.log('Error:' + error.code + " " + error.message);
        }
      });
    }

    $(function() {
      var Counter = AV.Object.extend("Counter");
      if ($('.leancloud_visitors').length == 1) {
        addCount(Counter);
      } else if ($('.post-title-link').length > 1) {
        showTime(Counter);
      }
    });
  </script>



  

  

  
  

  

  

  

</body>
</html>
